package org.nbict.iot.protocol.task;

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by songseven on 18/6/25.
 */
public class HourAssignment extends BaseFunction {

    @Override
    public void execute(TridentTuple tuple, TridentCollector
            collector) {

        DiagnosisEvent diagnosis = (DiagnosisEvent) tuple.getValue(0);
        String city = (String) tuple.getValue(1);

        long timestamp = diagnosis.time;
        long hourSinceEpoch = timestamp / 1000 / 60 / 60;

        String key = city + ":" + diagnosis.diagnosisCode + ":" + hourSinceEpoch;

        List values = new ArrayList();
        values.add(hourSinceEpoch);
        values.add(key);
        collector.emit(values);
    }
}
